In [1]:
sc
Out[1]:
In [2]:
import org.apache.spark.mllib.linalg._
import org.apache.spark.mllib.regression._
Out[2]:
In [7]:
val rawDatain = sc.textFile("/Users/wy/Desktop/advance_Spark/train.csv")
Out[7]:
In [8]:
val header = rawDatain.first
Out[8]:
In [9]:
val rawData = rawDatain.filter(_ != header)
Out[9]:
In [10]:
rawData.take(2)
Out[10]:
In [11]:
val data = rawData.map{ line =>
val value = line.split(",")
val values = Array(value(1).toDouble,value(2).toDouble,value(3).toDouble,value(4).toDouble,value(5).toDouble,value(6).toDouble,value(7).toDouble,value(8).toDouble,value(9).toDouble,value(10).toDouble,value(11).toDouble)
val featureVector = Vectors.dense(values.init)
val label = values.last-1
LabeledPoint(label,featureVector)
}
Out[11]:
In [12]:
data.take(2)
Out[12]:
In [13]:
val Array(trainData, testData) = data.randomSplit(Array(0.8, 0.2))
Out[13]:
In [14]:
trainData.cache()
testData.cache()
Out[14]:
In [15]:
import org.apache.spark.mllib.evaluation._
import org.apache.spark.mllib.tree._
import org.apache.spark.mllib.tree.model._
import org.apache.spark.rdd._
Out[15]:
In [16]:
val categoricalFeaturesInfo = Map[Int, Int]()
val impurity = "variance"
val maxDepth = 5
val maxBins = 32
Out[16]:
In [17]:
val model = DecisionTree.trainRegressor(trainData, categoricalFeaturesInfo, impurity,maxDepth, maxBins)
Out[17]:
In [19]:
val labelsAndPredictions = testData.map { point =>
val prediction = model.predict(point.features)
(point.label, prediction)
// (point.features,point.label, prediction)
}
Out[19]:
In [20]:
val testMSE = labelsAndPredictions.map{ case(v, p) => math.pow((v - p), 2)}.mean()
println("Test Mean Squared Error = " + testMSE)
println("Learned regression tree model:\n" + model.toDebugString)
In [90]:
for (impurity <- Array("variance")){
for (maxDepth <- Array(3, 5)){
for (maxBins <- Array(16, 32)){
val model = DecisionTree.trainRegressor(trainData,Map[Int,Int](), impurity, maxDepth, maxBins)
val labelsAndPredictions = testData.map { point =>
val prediction = model.predict(point.features)
(point.label, prediction)
// (point.features,point.label, prediction)
}
val testMSE = labelsAndPredictions.map{ case(v, p) => math.pow((v - p), 2)}.mean()
println(((maxDepth, maxBins), testMSE))
}
}
}
In [111]:
// Save model
// model.save(sc, "/Users/wy/Desktop/advance_Spark/model")
// Load model
// val sameModel = DecisionTreeModel.load(sc, "/Users/wy/Desktop/advance_Spark/model")
// avoid incomplete useless val tmp =1
val tmp =1
Out[111]:
In [91]:
import org.apache.spark.mllib.tree.RandomForest
import org.apache.spark.mllib.tree.model.RandomForestModel
import org.apache.spark.mllib.util.MLUtils
In [92]:
val categoricalFeaturesInfo = Map[Int, Int]()
val numTrees = 3 // Use more in practice.
val featureSubsetStrategy = "auto" // Let the algorithm choose.
val impurity = "variance"
val maxDepth = 4
val maxBins = 32
Out[92]:
In [93]:
val model = RandomForest.trainRegressor(trainData, categoricalFeaturesInfo,numTrees, featureSubsetStrategy, impurity, maxDepth, maxBins)
Out[93]:
In [94]:
val labelsAndPredictions = testData.map { point =>
val prediction = model.predict(point.features)
(point.label, prediction)
}
Out[94]:
In [99]:
labelsAndPredictions.take(2)
Out[99]:
In [108]:
val testMSE = labelsAndPredictions.map{ case(v, p) => math.pow((v - p), 2)}.mean()
println("Test Mean Squared Error = " + testMSE)
println("Learned regression forest model:\n" + model.toDebugString)
In [112]:
// Save and load model
// model.save(sc, "myModelPath")
// val sameModel = RandomForestModel.load(sc, "myModelPath")
// avoid incomplete useless val tmp =1
val tmp =1
Out[112]:
In [110]:
import org.apache.spark.mllib.tree.GradientBoostedTrees
import org.apache.spark.mllib.tree.configuration.BoostingStrategy
import org.apache.spark.mllib.tree.model.GradientBoostedTreesModel
In [113]:
val boostingStrategy = BoostingStrategy.defaultParams("Regression")
boostingStrategy.numIterations = 3
boostingStrategy.treeStrategy.maxDepth = 5
boostingStrategy.treeStrategy.categoricalFeaturesInfo = Map[Int, Int]()
Out[113]:
In [114]:
val model = GradientBoostedTrees.train(trainData, boostingStrategy)
Out[114]:
In [115]:
val labelsAndPredictions = testData.map { point =>
val prediction = model.predict(point.features)
(point.label, prediction)
}
Out[115]:
In [116]:
val testMSE = labelsAndPredictions.map{ case(v, p) => math.pow((v - p), 2)}.mean()
println("Test Mean Squared Error = " + testMSE)
println("Learned regression GBT model:\n" + model.toDebugString)
In [118]:
// model.save(sc, "myModelPath")
// val sameModel = GradientBoostedTreesModel.load(sc, "myModelPath")
// avoid incomplete useless val tmp =1
val tmp =1
Out[118]:
In [ ]: